import dataclasses
import json
import logging
import pathlib
from datetime import datetime, timezone
from typing import Dict, Optional

import click
from docgen_types import Plugin
from utils import should_write_json_file

from datahub.ingestion.api.decorators import SupportStatus
from datahub.ingestion.source.source_registry import source_registry

logger = logging.getLogger(__name__)


DENY_LIST = {
    "snowflake-summary",
    "snowflake-queries",
    "bigquery-queries",
    "datahub-mock-data",
}


def load_plugin_capabilities(plugin_name: str) -> Optional[Plugin]:
    """Load plugin capabilities without generating full documentation."""
    logger.debug(f"Loading capabilities for {plugin_name}")

    try:
        class_or_exception = source_registry._ensure_not_lazy(plugin_name)
        if isinstance(class_or_exception, Exception):
            # Log the specific error but don't re-raise it
            logger.warning(f"Plugin {plugin_name} failed to load: {class_or_exception}")
            return None
        source_type = source_registry.get(plugin_name)
        logger.debug(f"Source class is {source_type}")

        if hasattr(source_type, "get_platform_name"):
            platform_name = source_type.get_platform_name()
        else:
            platform_name = plugin_name.title()

        platform_id = None
        if hasattr(source_type, "get_platform_id"):
            platform_id = source_type.get_platform_id()
        if platform_id is None:
            logger.warning(f"Platform ID not found for {plugin_name}")
            return None

        plugin = Plugin(
            name=plugin_name,
            platform_id=platform_id,
            platform_name=platform_name,
            classname=".".join([source_type.__module__, source_type.__name__]),
        )

        if hasattr(source_type, "get_support_status"):
            plugin.support_status = source_type.get_support_status()

        if hasattr(source_type, "get_capabilities"):
            capabilities = list(source_type.get_capabilities())
            if capabilities:
                capabilities.sort(key=lambda x: x.capability.value)
                plugin.capabilities = capabilities
            else:
                logger.debug(f"No capabilities defined for {plugin_name}")
                plugin.capabilities = []
        else:
            logger.debug(f"No get_capabilities method for {plugin_name}")
            plugin.capabilities = []

        return plugin

    except Exception as e:
        logger.warning(f"Failed to load capabilities for {plugin_name}: {e}")
        return None


@dataclasses.dataclass
class CapabilitySummary:
    """Summary of capabilities across all plugins."""

    plugin_details: Dict[str, Dict]  # plugin_name -> detailed info


def generate_capability_summary() -> CapabilitySummary:
    """Generate a comprehensive summary of capabilities across all plugins."""

    plugin_details: Dict[str, Dict] = {}

    for plugin_name in sorted(source_registry.mapping.keys()):
        if plugin_name in DENY_LIST:
            logger.info(f"Skipping {plugin_name} as it is on the deny list")
            continue

        plugin = load_plugin_capabilities(plugin_name)

        if plugin is None:
            continue

        plugin_details[plugin_name] = {
            "platform_id": plugin.platform_id,
            "platform_name": plugin.platform_name,
            "classname": plugin.classname,
            "support_status": plugin.support_status.name
            if plugin.support_status != SupportStatus.UNKNOWN
            else None,
            "capabilities": [],
        }

        if plugin.capabilities:
            for cap_setting in plugin.capabilities:
                capability_name = cap_setting.capability.name

                plugin_details[plugin_name]["capabilities"].append(
                    {
                        "capability": capability_name,
                        "supported": cap_setting.supported,
                        "description": cap_setting.description,
                        "subtype_modifier": [m for m in cap_setting.subtype_modifier]
                        if cap_setting.subtype_modifier
                        else None,
                    }
                )

    return CapabilitySummary(
        plugin_details=plugin_details,
    )


def save_capability_report(summary: CapabilitySummary, output_dir: str) -> None:
    """Save the capability summary as JSON files, but only write if contents have changed."""

    output_path = pathlib.Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    existing_capabilities = {}
    existing_summary_file = pathlib.Path(
        "./src/datahub/ingestion/autogenerated/capability_summary.json"
    )
    if existing_summary_file.exists():
        try:
            with open(existing_summary_file, "r") as f:
                existing_data = json.load(f)
                existing_capabilities = existing_data.get("plugin_details", {})
                logger.info(
                    f"Loaded existing capability data for {len(existing_capabilities)} plugins"
                )
        except Exception as e:
            logger.warning(f"Failed to load existing capability data: {e}")

    missing_plugins = set(existing_capabilities.keys()) - set(
        summary.plugin_details.keys()
    )
    for plugin_name in missing_plugins:
        logger.warning(
            f"Plugin {plugin_name} failed to load, using existing capability data as fallback. Manually remove from capability_summary.json if you want to remove it from the report."
        )
        summary.plugin_details[plugin_name] = existing_capabilities[plugin_name]

    summary_dict = dataclasses.asdict(summary)
    summary_dict["generated_by"] = "metadata-ingestion/scripts/capability_summary.py"
    summary_dict["generated_at"] = datetime.now(timezone.utc).isoformat()
    summary_json = json.dumps(summary_dict, indent=2, sort_keys=True)

    summary_file = output_path / "capability_summary.json"
    write_file = should_write_json_file(
        summary_file, summary_dict, "capability summary file"
    )

    if write_file:
        with open(summary_file, "w") as f:
            f.write(summary_json)
        logger.info(f"Capability summary saved to {summary_file}")


@click.command()
@click.option(
    "--output-dir",
    type=str,
    default="./autogenerated",
    help="Output directory for capability reports",
)
@click.option(
    "--source",
    type=str,
    required=False,
    help="Generate report for specific source only",
)
def generate_capability_report(output_dir: str, source: Optional[str] = None) -> None:
    """Generate a comprehensive capability report for all ingestion sources."""

    logger.info("Starting capability report generation...")

    if source:
        if source not in source_registry.mapping:
            logger.error(f"Source '{source}' not found in registry")
            return
        original_mapping = source_registry.mapping.copy()
        source_registry.mapping = {source: original_mapping[source]}

    try:
        summary = generate_capability_summary()
        save_capability_report(summary, output_dir)

        print("Capability Report Generation Complete")
        print("=====================================")
        print(f"Total plugins processed: {len(summary.plugin_details)}")
        print(f"Plugins with capabilities: {len(summary.plugin_details)}")
        print(f"Output directory: {output_dir}")

    finally:
        if source:
            source_registry.mapping = original_mapping


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format="[%(asctime)s %(levelname)-8s {%(name)s:%(lineno)d}] - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S %Z",
    )
    generate_capability_report()
